UP | HOME

How To Replicate A Reader In Go

在使用 Go 编程时,有时我们会需要多次消费某个 io.Reader 的内容(注意:是同一内容读取多次,而不同于通常的多次分段读取),这就需要我们可以“复制”一个 io.Reader 。 而且某些场景下不太方便将 io.Reader 内容一次性读出,比如将 net.Conn 视为 io.Reader 使用或读取超大文件无法全部加载到内存等情形。 所以在这里我们还是讨论 io.Reader 本身的复制。

对于最简单的消费两次的场景,网上最常见的推荐的写法基本是这样的:

func ReplicateReader(r io.Reader) (r1 io.Reader, r2 io.Reader) {
        buf := bytes.Buffer{}
        teeR := io.TeeReader(r, &buf)
        return teeR, &buf
}

很多人认为这体现了 Go 简洁高效的特点。 某种程度来说,的确如此,如果使用场景是:仅有两个 io.Reader 的使用者,且至少保证有某一个先读的使用者会将内容读取完毕(直至 io.EOF )。

首先,为什么这个写法对于超过两个 io.Reader 的使用者就不简洁高效了? 我们这里假定有 n 个使用者 (n > 2)。 一是因为需要 n - 1 次的调用,当然这点可以很简单地被更好的 ReplicateReader() 的定义解决,换为 func(r io.Reader, n int) []io.Reader 即可。 二则是因为这种写法为每一个 io.TeeReader() 都要准备单独的 buffer ,既然有 n - 1 次 io.TeeReader() 的调用当然也就需要有 n - 1 个独立的 buffer 了,这在内存上就不够高效了(毕竟存储的都是来自同一 io.Reader 的内容)。

其次为什么需要保证有某一个先读的使用者将内容读取完毕呢? 因为 io.TeeReader() 其实是有“主动”、“从动”之分的,以之前这个 ReplicateReader() 写法为例,这里返回的 r2 (buf) 的内容是来自于 r1 (teeR) 读取过的。 这就要求这个“主动”的 io.Reader 的使用者必须要先行读取内容,不然别人就没内容可读;另外它还得读取完所有内容,因为别人无法读到比它更多的内容。 对于 n 个使用者,当然我们可以优化到仅要求其中 1 个使用者来使用返回的“主动”的这个 io.Reader 剩下的使用者则使用返回的其它“从动”的 io.Reader 即可,甚至于为了 ReplicateReader() 调用者方便还可以将这个“主动”的使用者内置不予返回而返回额外的 n 个“从动”的 io.ReaderReplicateReader 来确保这个“主动”使用者的行为。 这样要么对使用场景有要求,要么就不那么高效(多出一个隐藏的使用者)。

想要实现一个更高效的 ReplicateReader() 可以采用以下做法:

  1. 返回的每个 io.Reader 独自维护自己的 offset 但共用一个 buffer
  2. buffer 还有当前 io.Reader 未读取的内容 (offset < len(buffer)) 则(仅)从 buffer 剩余内容读取
  3. buffer 中内容目前已全被当前 io.Reader 读取 (offset == len(buffer)) ,则从原始的 io.Reader 中读取并将内容追加到 buffer 中,如遇 io.EOF 则对 buffer 标记 EOF (将 EOF 视为一种特殊的从原始 io.Reader 读取到的内容)

在这个最简单的策略下我们可以保证这些 ReplicateReader() 返回的 io.Reader 的内容的正确性以及完整性(不论它们的使用者以何种顺序如何读取),并且使用 1 个公共的 buffer + n 个 offset 代替了原来的 n 个独立的 buffer 。 当然就目前而言,这个 buffer 如同貔貅一样只增不减,我们可以进行更进一步的优化。 简单想想就能够意识到其实 buffer 仅需要维护读的最靠前和最靠后的两个 io.Reader 之间的内容即可,并且更进一步的,因为最靠后的使用者读到的内容正是当前 buffer 中的所有内容所以我们也可以认为 buffer 需要维护的是最靠前的 io.Reader 之后的内容。 但一旦想到这里,另一个问题也就浮上了水面:如果有某个使用者提前退出了,那么它的 io.Reader 的 offset 就不会再更新了,因而我们也要为这些使用者设计一种退出机制。

先对 ReplicateReader() 的定义做一下优化,将它定义为 func(r io.Reader, n int) []io.ReadCloser ,这里返回的 io.ReadCloserClose() 是用来标记它提前结束使用的,不过按照 Go 惯常的使用方式读取完毕到 io.EOF 后再调用 Close() 也是兼容的。 既然为使用者引入了退出机制,就可以接着引入其它新的优化策略了:

  1. 引入优先级队列来管理各个 offset ,采用小顶堆,因为我们关心的是最靠前的 offset
  2. 返回的 io.ReadCloserClose() 调用时会将当前使用者从优先级队列中移出
  3. buffer 引入 base 并修改规则 2. 和 3. 的比对条件,如果最靠前的 offsetbuffer 中对应位置已经在后半部分则将 buffer 中的内容前移并更新 base
  4. 如果仅剩一个使用者,当它从原始的 io.Reader 中读取时,就不必再将读到的内容拷贝至 buffer

我按照这个策略实现了一版 ReplicateReader() 放在 repreader.go ,欢迎取用。

P.S. io.Writer 的文档中要求如果没能一次性写入就必须返回一个错误(也就是说没错误的话必须写完传入的内容),不过幸好 io.Reader 没这要求不然读取操作的实现还得更复杂些。

Write must return a non-nil error if it returns n < len(p).